package com.mugui.spring.net.websocket;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.websocket.Session;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.mugui.spring.base.Task;
import com.mugui.spring.base.TaskInterface;
import com.mugui.spring.net.baghandle.NetHandle;
import com.mugui.spring.net.bean.NetBag;

import cn.hutool.cache.GlobalPruneTimer;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.Getter;

@Component
@Task(time = 0, value = Task.DEFAULT)
public final class WebSocketTask extends HashMap<Integer, ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>>>
		implements TaskInterface {

	/**
	 * 
	 */
	private static final long serialVersionUID = 5595734246734609508L;

	/**
	 * 得到管理session的map
	 * 
	 * @auther 木鬼
	 * @param map
	 * @param bag
	 * @param webSocketBean
	 * @return
	 */
	private ConcurrentHashMap<Session, NetBag> getSessionMap(
			ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>> map, WebSocketBean webSocketBean) {

		ConcurrentHashMap<Session, NetBag> concurrentHashMap = map.get(webSocketBean.getValue());
		if (concurrentHashMap == null) {
			concurrentHashMap = new ConcurrentHashMap<>();
			map.put(webSocketBean.getValue(), concurrentHashMap);
		}
		return concurrentHashMap;
	}

	/**
	 * 得到关联的websocket类
	 * 
	 * @auther 木鬼
	 * @param bag
	 * @return
	 */
	private WebSocketBean getWebSocketBean(String key) {
		Object object = webSocketManager.get(key);
		if (object == null || !(object instanceof WebSocketBean)) {
			throw new RuntimeException(object + " 未发现的websocket处理器，关于：" + key);
		}
		return (WebSocketBean) object;
	}

	/**
	 * 得到管理 {@link WebSocketBean #getType() }的map
	 * 
	 * @auther 木鬼
	 * @param type
	 * @return
	 */
	private ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>> getMap(int type) {
		ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>> concurrentHashMap2 = get(type);
		if (concurrentHashMap2 == null) {
			concurrentHashMap2 = new ConcurrentHashMap<>();
			put(type, concurrentHashMap2);
		}
		return concurrentHashMap2;
	}

	/**
	 * websocket申请监听
	 * 
	 * @auther 木鬼
	 * @param session
	 * @param bag
	 */
	public void sub(Session session, NetBag bag) {
		if (session == null || bag == null || StringUtils.isBlank(bag.getSession()))
			return;
		Session session2 = user_sessions.get(bag.getSession());
		if (session2 != null && !session2.getId().equals(session.getId())) {
			try {
				if (session2.isOpen())
					session2.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		user_sessions.put(bag.getSession(), session);
		WebSocketBean webSocketBean = getWebSocketBean(bag.getFunc());

		ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>> map = getMap(webSocketBean.getType());

		ConcurrentHashMap<Session, NetBag> sessionMap = getSessionMap(map, webSocketBean);
		sessionMap.put(session, bag);

		createCycleTask(webSocketBean);
	}

	/**
	 * 取消申请监听
	 * 
	 * @auther 木鬼
	 * @param session
	 * @param bag
	 */
	public void unsub(Session session, NetBag bag) {
		WebSocketBean webSocketBean = getWebSocketBean(bag.getFunc());
		ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>> map = getMap(webSocketBean.getType());
		ConcurrentHashMap<Session, NetBag> sessionMap = getSessionMap(map, webSocketBean);
		sessionMap.remove(session);
	}

	@Autowired
	private NetHandle nethandle;

	@Autowired
	private WebSocketManager webSocketManager;

	private ConcurrentHashMap<String, Session> user_sessions = null;

	public void init() {
		System.out.println(WebSocketTask.class.getName() + "加载");
		scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(10);
		scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
		ThreadFactoryBuilder setNamePrefix = ThreadFactoryBuilder.create().setNamePrefix("Webscoket-Scheduled-");
		scheduledThreadPoolExecutor.setThreadFactory(setNamePrefix.build());

		user_sessions = new ConcurrentHashMap<>();
		GlobalPruneTimer.INSTANCE.schedule(new Runnable() {
			@Override
			public void run() {
				System.out.println("user_sessions--->>>清理");
				Iterator<Entry<String, Session>> iterator = user_sessions.entrySet().iterator();
				while (iterator.hasNext()) {
					Entry<String, Session> next = iterator.next();
					if (!next.getValue().isOpen()) {
						String id = next.getValue().getId();
						ThreadTask remove = THREAD_MAP.remove(id);
						if (remove != null) {
							remove.time = 0;
						}
						iterator.remove();
					}
				}
			}
		}, 60000);
	}

	private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null;

	private final HashMap<String, WebSocketBeanTask> scheduledThreadPoolExecutorMap = new HashMap<>();;

	public void createCycleTask(WebSocketBean webSocketBean) {
		if (webSocketBean.getType() != WebSocket.CYCLE) {
			return;
		}
		synchronized (WebSocketTask.class) {
			if (!scheduledThreadPoolExecutorMap.containsKey(webSocketBean.getValue())) {
				scheduledThreadPoolExecutorMap.put(webSocketBean.getValue(),
						new WebSocketBeanTask(webSocketBean, this));
				scheduledThreadPoolExecutor.scheduleAtFixedRate(
						scheduledThreadPoolExecutorMap.get(webSocketBean.getValue()), webSocketBean.getBlank(),
						webSocketBean.getBlank(), TimeUnit.MILLISECONDS);
			}
		}

	}

	private static class WebSocketBeanTask implements Runnable {
		WebSocketBean webSocketBean = null;
		WebSocketTask webSocketTask = null;

		public WebSocketBeanTask(WebSocketBean webSocketBean, WebSocketTask webSocketTask) {
			this.webSocketBean = webSocketBean;
			this.webSocketTask = webSocketTask;
		}

		@Override
		public void run() {
			try {
				ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>> map = webSocketTask
						.getMap(WebSocket.CYCLE);
				if (map.isEmpty()) {
					return;
				}
				ConcurrentHashMap<Session, NetBag> concurrentHashMap = map.get(webSocketBean.getValue());
				webSocketTask.cycle(webSocketBean, concurrentHashMap, null, null, BIDA);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

	/**
	 * 无需实现
	 */
	@Deprecated
	@Override
	public void run() {
		// 为所有的循环任务创建定时器

	}

	/**
	 * 必到的包
	 */
	static final int BIDA = 0x0;
	/**
	 * 非必达的包
	 */
	static final int NON_BIDA = 0x1;

	/**
	 * 处理循环的websocket监听器
	 * 
	 * @param webSocketBean 包属性
	 * @param hashMap       用户集合
	 * @param NetBag        处理的包数据
	 * @param mark          标识
	 * @param type          {@link WebSocketTask#BIDA}
	 *                      {@link WebSocketTask#NON_BIDA}
	 */
	private void cycle(WebSocketBean webSocketBean, ConcurrentHashMap<Session, NetBag> hashMap, NetBag bag, String mark,
			int type) {
		WebSocketMethodConfBean conf = webSocketBean.getConf();
		if (conf != null && conf.getValue() == WebSocketMethodConfBean.ONE) {
			Iterator<Entry<Session, NetBag>> lIterator = hashMap.entrySet().iterator();
			while (lIterator.hasNext()) {
				Entry<Session, NetBag> next = lIterator.next();
				String udpHandle = null;
				if (bag == null) {
					udpHandle = nethandle.WsHandle(NetBag.newBean(next.getValue()));
				} else {
					next.getValue().setData(bag.getData());
					udpHandle = nethandle.WsHandle(NetBag.newBean(next.getValue()));
				}
				if (next.getKey().isOpen()) {
					sendData(next.getKey(), udpHandle, mark, type);
				} else {
					lIterator.remove();
				}
			}
		} else {
			Iterator<Entry<Session, NetBag>> lIterator = hashMap.entrySet().iterator();
			String udpHandle = null;
			while (lIterator.hasNext()) {
				Entry<Session, NetBag> next = lIterator.next();
				if (udpHandle == null) {
					if (bag == null) {

						udpHandle = nethandle.WsHandle(NetBag.newBean(next.getValue()));
					} else {
						next.getValue().setData(bag.getData());
						udpHandle = nethandle.WsHandle(NetBag.newBean(next.getValue()));
					}
				}
				if (next.getKey().isOpen()) {
					sendData(next.getKey(), udpHandle, mark, type);
				} else {
					lIterator.remove();
				}
			}

		}

	}

	@Autowired
	private WebSocketSessionManager webSocketSessionManager;

	@Getter
	private final ConcurrentHashMap<String, ThreadTask> THREAD_MAP = new ConcurrentHashMap<>();

	private final class TempBean {
		public TempBean(Session key, String udpHandle) {
			this.key = key;
			this.udpHandle = udpHandle;
		}

		Session key;
		String udpHandle;
	}

	final class ThreadTask extends Thread {
		private boolean bool = true;
		private ConcurrentHashMap<String, ConcurrentLinkedQueue<TempBean>> map = new ConcurrentHashMap<>();

		private int lock_mark = 0;

		long time = System.currentTimeMillis();

		private String id = "";

		public ThreadTask(String id) {
			this.id = id;
		}

		@Override
		public void run() {
			setName("WebSocket_ThreadTask");
			while (bool) {
				Iterator<Entry<String, ConcurrentLinkedQueue<TempBean>>> iterator = map.entrySet().iterator();
				boolean isbag = false;
				while (iterator.hasNext()) {
					Entry<String, ConcurrentLinkedQueue<TempBean>> next = iterator.next();
					ConcurrentLinkedQueue<TempBean> value = next.getValue();
					while (bool) {
						TempBean poll = value.poll();
						if (poll == null) {
							break;
						}
						isbag = true;
						lock_mark = 0;
						Session key = poll.key;
						try {
							if (key.isOpen()) {
								time = System.currentTimeMillis();
								key.getBasicRemote().sendText(poll.udpHandle);
							} else {
								key.close();
								webSocketSessionManager.del(key.getId());
								bool = false;
								break;
							}
						} catch (Exception e) {
							e.printStackTrace();
							try {
								key.close();
								webSocketSessionManager.del(key.getId());
								bool = false;
								break;
							} catch (IOException e1) {
								e1.printStackTrace();
							}
						}
					}
				}
				if (!isbag) {
					lock_mark++;
				}

				if (bool && lock_mark != 0) {
					synchronized (map) {
						while (bool && lock_mark != 0) {
							try {
								map.wait(1000);
								if (System.currentTimeMillis() - time > 60000) {
									bool = false;
								}
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}

					}
				}
			}
			System.out.println("关闭" + id);
			THREAD_MAP.remove(id);
			map.clear();
		}

		/**
		 * 
		 * @param key       session
		 * @param udpHandle 反馈的数据
		 * @param mark      标识
		 * @param type      {@link WebSocketTask#BIDA} {@link WebSocketTask#NON_BIDA}
		 */
		public boolean add(Session key, String udpHandle, String mark, int type) {
			if (bool) {
				if (mark == null) {
					mark = "";
				}
				ConcurrentLinkedQueue<TempBean> concurrentLinkedQueue = map.get(mark + "_" + type);
				switch (type) {
				case BIDA:
					break;
				case NON_BIDA:
					if (concurrentLinkedQueue != null && !concurrentLinkedQueue.isEmpty()) {
						return true;
					}
					break;

				default:
					throw new RuntimeException(
							"type :" + type + "错误" + "   key:" + key + " udpHandle" + udpHandle + " mark:" + mark);
				}
				if (concurrentLinkedQueue == null) {
					concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
					map.put(mark + "_" + type, concurrentLinkedQueue);
				}
				synchronized (map) {
					concurrentLinkedQueue.offer(new TempBean(key, udpHandle));
					lock_mark = 0;
					map.notifyAll();
				}
				return true;
			}
			return false;
		}
	}

	/**
	 * 
	 * @param key       session
	 * @param udpHandle 反馈的数据
	 * @param mark      标识
	 * @param type      {@link WebSocketTask#BIDA} {@link WebSocketTask#NON_BIDA}
	 */
	void sendData(Session key, String udpHandle, String mark, int type) {
		ThreadTask thread = THREAD_MAP.get(key.getId());
		if (thread == null) {
			synchronized (THREAD_MAP) {
				if (thread == null) {
					thread = new ThreadTask(key.getId());
					THREAD_MAP.put(key.getId(), thread);
					thread.start();
				}
			}
		}
		boolean add = thread.add(key, udpHandle, mark, type);
		if (!add && !thread.bool) {
			thread = new ThreadTask(key.getId());
			THREAD_MAP.put(key.getId(), thread);
			thread.start();
			add = thread.add(key, udpHandle, mark, type);
		}
	}

	/**
	 * 此异步只表示module层的异步处理<br>
	 * 触发某个websocket处理的监听器
	 * 
	 * @auther 木鬼
	 * @param key
	 */
	public void triggerAsync(String key) {
		triggerAsync(key, null);
	}

	/**
	 * 此同步只表示 module控制层的同步处理<br>
	 * 
	 * @auther 木鬼
	 * @param key
	 */
	public void triggerBasic(String key) {
		triggerBasic(key, null);
	}

	/**
	 * 此异步只表示module层的异步处理<br>
	 * 触发某个websocket处理的监听器，并传入自定义的bag
	 * 
	 * @auther 木鬼
	 * @param key
	 */
	public void triggerAsync(String key, NetBag bag) {
		triggerAsync(key, bag, null, BIDA);
	}

	/**
	 * 此同步只表示 module控制层的同步处理<br>
	 * 触发某个websocket处理的监听器，并传入自定义的bag
	 * 
	 * @auther 木鬼
	 * @param key
	 */
	public void triggerBasic(String key, NetBag bag) {
		triggerBasic(key, bag, null, BIDA);
	}

	/**
	 * 向某个用户触发func
	 * 
	 * @auther 木鬼
	 * @param sessionId 用户唯一id
	 * @param key       ,推送的消息
	 */
	public void sendDate(String sessionId, String func) {
		sendDate(sessionId, func, null);
	}

	/**
	 * 向某个用户触发func ,并加入特定的参数
	 * 
	 * @auther 木鬼
	 * @param sessionId 用户唯一id
	 * @param key       ,推送的消息
	 * @param bag       需要处理的包
	 */
	public void sendDate(String sessionId, String func, NetBag bag) {
		sendDate(sessionId, func, bag, null, BIDA);
	}

	/**
	 * 直接向某用户推送反馈包
	 * 
	 * @auther 木鬼
	 * @param sessionId
	 * @param bag
	 */
	public void sendDate(String sessionId, NetBag return_bag) {
		sendDate(sessionId, return_bag, null, NON_BIDA);
	}

	/*
	 * ***********************************以下是非必达包的处理
	 */

	/**
	 * 向用户推送不一定必到的包
	 * 
	 * @auther 木鬼
	 * @param sessionId  用户seesionId
	 * @param retrue_bag 向用户发送的数据
	 * @param mark       包标识
	 */
	public void sendNonDate(String sessionId, NetBag return_bag, String mark) {
		sendDate(sessionId, return_bag, mark, NON_BIDA);
	}

	/**
	 * 向用户推送不一定必到的包
	 * 
	 * @auther 木鬼
	 * @param sessionId
	 * @param func
	 * @param bag
	 * @param mark
	 */
	public void sendNonDate(String sessionId, String func, NetBag bag, String mark) {
		sendDate(sessionId, func, bag, mark, NON_BIDA);
	}

	/**
	 * 向用户推送不一定必到的包
	 * 
	 * @auther 木鬼
	 * @param sessionId
	 * @param func
	 * @param mark
	 */
	public void sendNonDate(String sessionId, String func, String mark) {
		sendNonDate(sessionId, func, null, mark);
	}

	/**
	 * 向用户推送不一定必到的包
	 * 
	 * @auther 木鬼
	 * @param key
	 * @param mark
	 */
	public void triggerNonBasic(String key, String mark) {
		triggerNonBasic(key, null, mark);
	}

	/**
	 * 向用户推送不一定必到的包
	 * 
	 * @auther 木鬼
	 * @param key
	 * @param bag
	 * @param mark
	 */
	public void triggerNonBasic(String key, NetBag bag, String mark) {
		triggerBasic(key, bag, mark, NON_BIDA);
	}

	/**
	 * 向用户推送不一定必到的包
	 * 
	 * @auther 木鬼
	 * @param key
	 * @param mark
	 */
	public void triggerNonAsync(String key, String mark) {
		triggerNonAsync(key, null, mark);
	}

	/**
	 * 向用户推送不一定必到的包
	 * 
	 * @auther 木鬼
	 * @param key
	 * @param bag
	 * @param mark
	 */
	public void triggerNonAsync(String key, NetBag bag, String mark) {
		triggerAsync(key, bag, mark, NON_BIDA);
	}

	private void triggerAsync(String key, NetBag bag, String mark, int type) {
		WebSocketBean webSocketBean = getWebSocketBean(key);
		ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>> concurrentHashMap = getMap(WebSocket.TRIGGER);
		ConcurrentHashMap<Session, NetBag> sessionMap = getSessionMap(concurrentHashMap, webSocketBean);
		if (sessionMap.isEmpty()) {
			return;
		}
		scheduledThreadPoolExecutor.execute(new Runnable() {
			@Override
			public void run() {
				cycle(webSocketBean, sessionMap, bag, mark, type);
			}
		});
	}

	private void triggerBasic(String key, NetBag bag, String mark, int type) {
		WebSocketBean webSocketBean = getWebSocketBean(key);
		ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>> concurrentHashMap = getMap(WebSocket.TRIGGER);
		ConcurrentHashMap<Session, NetBag> sessionMap = getSessionMap(concurrentHashMap, webSocketBean);
		if (sessionMap.isEmpty()) {
			return;
		}
		cycle(webSocketBean, sessionMap, bag, mark, type);
	}

	/**
	 * 根据类型向用户推送包
	 * 
	 * @auther 木鬼
	 * @param sessionId
	 * @param func
	 * @param bag
	 * @param mark
	 * @param type      {@link WebSocketTask#BIDA} {@link WebSocketTask#NON_BIDA}
	 */
	private void sendDate(String sessionId, String func, NetBag bag, String mark, int type) {
		WebSocketBean webSocketBean = getWebSocketBean(func);

		ConcurrentHashMap<String, ConcurrentHashMap<Session, NetBag>> concurrentHashMap = getMap(WebSocket.TRIGGER);

		ConcurrentHashMap<Session, NetBag> sessionMap = getSessionMap(concurrentHashMap, webSocketBean);
		if (sessionMap.isEmpty()) {
			return;
		}
		Session session = user_sessions.get(sessionId);
		if (session == null) {
			return;
		}
		NetBag netBag = sessionMap.get(session);
		if (netBag == null) {
			return;
		}
		String udpHandle = null;
		if (bag == null) {
			udpHandle = nethandle.WsHandle(NetBag.newBean(netBag));
		} else {
			netBag.setData(bag.getData());
			udpHandle = nethandle.WsHandle(NetBag.newBean(netBag));
		}
		sendData(session, udpHandle, mark, NON_BIDA);
	}

	/**
	 * 向用户推送包
	 * 
	 * @auther 木鬼
	 * @param sessionId  用户seesionId
	 * @param retrue_bag 向用户发送的数据
	 * @param mark       包标识
	 * @param type       类型
	 */
	private void sendDate(String sessionId, NetBag return_bag, String mark, int type) {
		Session session = user_sessions.get(sessionId);
		if (session == null) {
			return;
		}
		sendData(session, return_bag.toString(), mark, NON_BIDA);
	}

}
